<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Beanstalk\Client;
require_once __DIR__ . '/Workerman/Autoloader.php';
require_once __DIR__ . '/Channel/src/Server.php';
require_once __DIR__ . '/Channel/src/Client.php';
require_once __DIR__ . '/mysql/src/Connection.php';  //mysql组件
require_once __DIR__ . '/Beanstalk/Client.php';

// 创建一个Worker监听2345端口，使用http协议通讯
$worker = new Worker("websocket://172.17.73.202:9090");
$channel_server = new Channel\Server('0.0.0.0', 2206);
// 心跳间隔25秒
define('HEARTBEAT_TIME', 30);
// 设置实例的名称
$worker->name = 'MyWebsocketWorker';
// 启动4个进程对外提供服务
$worker->count = 1;

$worker->onWorkerStart = function($worker) {
    Channel\Client::connect('127.0.0.1', 2206);
    //创建数据框连接
    global $db;
    $db = new \Workerman\MySQL\Connection('rm-uf6oi8jxh9hkx6xaeo.mysql.rds.aliyuncs.com', '3306', 'liuyu', 'SUIbian1995', 'bento');
    //注册消息推送函数
    Channel\Client::on('push', function($event_data)use($worker) {
        $data=$event_data['content'];
        echo "workerID:{$worker->id}  connected\n";
        foreach($worker->connections as $connection)
        {
            if($connection->user_id==$data['user_id']){
                $connection->send(json_encode(['action'=>'push']));
            }
        }
    });
    //注册定时器事件
    Channel\Client::on('time_push', function($event_data)use($worker) {
            $data=$event_data['content'];
            foreach ($worker->connections as $connection){
                if($connection->user_id==$data['user_id']){
                    Timer::add(1, function()use($worker, $data, $connection) {
                         $connection->send(json_encode(time()));
                    });
                }
            }
    });
    //注册检查事件 检查用户是否已经登录
    Channel\Client::on('over', function($event_data)use($worker) {
        $data=$event_data['content'];
        foreach ($worker->connections as $connection){
            $worker_id = isset($connection->worker->id) ? $connection->worker->id : 0;
            $connection_id = isset($connection->id) ? $connection->id : 0;
            if ($worker_id != $data['worker_id'] || $connection_id != $data['connection_id']) {
                if($connection->user_id==$data['user_id']){
                    $connection->send(json_encode(['action'=>'xintiao','context'=>'你已经被挤下线']));
                    //关闭连接
                    $connection->close();
                    unset($connection);
                }
            }
        }
    });
    //定义心跳
    Timer::add(1, function()use($worker){
        $time_now = time();
        foreach($worker->connections as $connection) {
            // 有可能该connection还没收到过消息，则lastMessageTime设置为当前时间
            if (empty($connection->lastMessageTime)) {
                $connection->lastMessageTime = $time_now;
                continue;
            }
            // 上次通讯时间间隔大于心跳间隔，则认为客户端已经下线，关闭连接
            if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                $connection->send(json_encode(['action'=>'xintiao','context'=>'你已经被强行下线']));
                $connection->close();
            }
        }
    });
    //定义主动推送所有人的事件
    Channel\Client::on('push_all', function($event_data)use($worker) {
        $data=$event_data['content'];
        Timer::add(1, function()use($worker,$data){
            foreach($worker->connections as $connection) {
                $connection->send(json_encode($data));
            }
        });
    });
    //定义聊天室
    Channel\Client::on('send', function($event_data)use($worker) {
        $data=$event_data['content'];
        foreach($worker->connections as $connection) {
            $connection->send(json_encode(['action' => 'send', 'data' => ['send_user' =>$data['send_user'] , 'message' => $data['message']]]));
        }
    });

//    //定义消息队列处理函数
//    $config = [
//        'persistent' => false, //是否长连接
//        'host' => '47.95.201.26',
//        'port' => 11300,  //端口号默认11300
//        'timeout' =>30   //连接超时时间
//    ];
//    $Client=new Client($config);  //实列化客户端
//    if(!$Client->connect()){
//        throw new \Exception('连接失败');
//    }  //连接服务端
//    $Client->useTube('test'); //使用通道
//    $Client->watch('test'); //将tube添加到watch list，consumer从watch list中的tube获取job
//    Timer::add(1, function()use($worker,$Client){
//        $res=$Client->reserve();  //获取一个job
//        $message_id=$res['id'];  //获取job id
//        if ($message_id < 1) {
//            return false;
//        }
//        var_dump($res);
//        $Client->delete($message_id); //将job从任务中清除
//    });
};
//连接时候进行处理
$worker->onConnect = function($connection)use($worker)
{
    $msg = "workerID:{$worker->id} connectionID:{$connection->id} connected\n";
    echo $msg;
    $connection->send($msg);
};

// 接收到浏览器发送的数据时回复hello world给浏览器
$worker->onMessage = function($connection, $data){
    // 给connection临时设置一个lastMessageTime属性，用来记录上次收到消息的时间
    $connection->lastMessageTime = time();
    // 通过全局变量获得db实例
    global $db;
    //业务逻辑
    $worker = $connection->worker;
    $worker_id = $connection->worker->id;
    $connection_id = $connection->id;
    $data=json_decode($data,true);
    if($data['action']=='login'){
        $connection->user_id=$data['user_id'];
        $data['worker_id']=$connection->worker->id;
        $data['connection_id']=$connection->id;
        Channel\Client::publish('over', array(
            'content'          => $data
        ));
        $connection->send(json_encode(['action' => 'login', 'data' => ['code' => 0, 'worker_id' => $worker_id, 'connection_id' => $connection_id, 'message' => '登录成功']]));
    }
    if($data['action']=='push'){
        Channel\Client::publish('push', array(
                'content'          => $data
        ));
    }
    if($data['action']=='timer'){
        Channel\Client::publish('time_push', array(
            'content'          => $data
        ));
    }
    if($data['action']=='db'){
        Timer::add(1, function()use($worker,$db){
            $all_tables=$db->select('id,mobile')->from('users')->where('id= :id')->bindValues(array('id'=>410335))->row();
            foreach($worker->connections as $connection) {
                $connection->send(json_encode($all_tables));
            }
        });
    }
    if($data['action']=='push_all'){
        $all_tables=$db->select('id,mobile')->from('users')->where('id= :id')->bindValues(array('id'=>$data['id']))->row();
        Channel\Client::publish('push_all', array(
            'content'          => $all_tables
        ));
    }
    if($data['action']=='send'){
        $data['send_user']=$connection->user_id;
        Channel\Client::publish('send', array(
            'content'          => $data
        ));
    }
};

$worker->onClose = function($connection)
{
    echo "connection closed\n";
};

// 运行worker
Worker::runAll();